
---
--- tests around access tracking within transaction blocks
---
CREATE SCHEMA access_tracking;
SET search_path TO 'access_tracking';

-- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
RESET client_min_messages;

SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90930500;

CREATE OR REPLACE FUNCTION relation_select_access_mode(relationId Oid)
    RETURNS int
    LANGUAGE C STABLE STRICT
    AS 'citus', $$relation_select_access_mode$$;

CREATE OR REPLACE FUNCTION relation_dml_access_mode(relationId Oid)
    RETURNS int
    LANGUAGE C STABLE STRICT
    AS 'citus', $$relation_dml_access_mode$$;

CREATE OR REPLACE FUNCTION relation_ddl_access_mode(relationId Oid)
    RETURNS int
    LANGUAGE C STABLE STRICT
    AS 'citus', $$relation_ddl_access_mode$$;

CREATE OR REPLACE FUNCTION distributed_relation(relation_name text)
RETURNS bool AS
$$
DECLARE
	part_method char;
BEGIN
		 select partmethod INTO part_method from pg_dist_partition  WHERE logicalrelid = relation_name::regclass;
         IF part_method = 'h' THEN
                RETURN true;
         ELSE
                RETURN false;
         END IF;
END;
$$ LANGUAGE 'plpgsql' IMMUTABLE;


CREATE OR REPLACE FUNCTION relation_access_mode_to_text(relation_name text, relationShardAccess int)
RETURNS text AS
$$
BEGIN
	 IF relationShardAccess = 0 and distributed_relation(relation_name) THEN
	 	RETURN 'not_parallel_accessed';
	 ELSIF relationShardAccess = 0 and NOT distributed_relation(relation_name) THEN
	 	RETURN 'not_accessed';
	 ELSIF relationShardAccess = 1 THEN
	 	RETURN 'reference_table_access';
	 ELSE
		RETURN 'parallel_access';
	 END IF;
END;
$$ LANGUAGE 'plpgsql' IMMUTABLE;



CREATE VIEW relation_accesses AS
	SELECT table_name,
			relation_access_mode_to_text(table_name, relation_select_access_mode(table_name::regclass)) as select_access,
			relation_access_mode_to_text(table_name, relation_dml_access_mode(table_name::regclass)) as dml_access,
			relation_access_mode_to_text(table_name, relation_ddl_access_mode(table_name::regclass)) as ddl_access
	FROM
		((SELECT 'table_' || i as table_name FROM generate_series(1, 7) i) UNION (SELECT 'partitioning_test') UNION (SELECT 'partitioning_test_2009') UNION (SELECT 'partitioning_test_2010')) tables;

SET citus.shard_replication_factor TO 1;
CREATE TABLE table_1 (key int, value int);
SELECT create_distributed_table('table_1', 'key');

CREATE TABLE table_2 (key int, value int);
SELECT create_distributed_table('table_2', 'key');

CREATE TABLE table_3 (key int, value int);
SELECT create_distributed_table('table_3', 'key');

CREATE TABLE table_4 (key int, value int);
SELECT create_distributed_table('table_4', 'key');

CREATE TABLE table_5 (key int, value int);
SELECT create_distributed_table('table_5', 'key');

CREATE TABLE table_6 (key int, value int);
SELECT create_reference_Table('table_6');

INSERT INTO table_1 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_2 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_4 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_5 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_6 SELECT i, i FROM generate_series(0,100) i;

-- create_distributed_table works fine
BEGIN;
	CREATE TABLE table_7 (key int, value int);
	SELECT create_distributed_table('table_7', 'key');
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_7')  ORDER BY 1;
COMMIT;

-- outside the transaction blocks, the function always returns zero
SELECT count(*) FROM table_1;
SELECT * FROM relation_accesses WHERE table_name = 'table_1';

-- a very simple test that first checks sequential
-- and parallel SELECTs,DMLs, and DDLs
BEGIN;
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
	SELECT count(*) FROM table_1 WHERE key = 1;
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
	SELECT count(*) FROM table_1 WHERE key = 1 OR key = 2;
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
	INSERT INTO table_1 VALUES (1,1);
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
	INSERT INTO table_1 VALUES (1,1), (2,2);
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
	ALTER TABLE table_1 ADD COLUMN test_col INT;

	-- now see that the other tables are not accessed at all
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';

ROLLBACK;


-- this test shows that even if two multiple single shard
-- commands executed, we can treat the transaction as sequential
BEGIN;
	SELECT count(*) FROM table_1 WHERE key = 1;
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
	SELECT count(*) FROM table_1 WHERE key = 2;
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
	INSERT INTO table_1 VALUES (1,1);
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
	INSERT INTO table_1 VALUES (2,2);
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
ROLLBACK;

-- a sample DDL example
BEGIN;
	ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
ROLLBACK;

-- a simple join touches single shard per table
BEGIN;
	SELECT
		count(*)
	FROM
		table_1, table_2, table_3, table_4, table_5
	WHERE
		table_1.key = table_2.key AND table_2.key = table_3.key AND
		table_3.key = table_4.key AND table_4.key = table_5.key AND
		table_1.key = 1;

		SELECT * FROM relation_accesses WHERE table_name LIKE 'table_%' ORDER BY 1;
ROLLBACK;

-- a simple real-time join touches all shard per table
BEGIN;
	SELECT
		count(*)
	FROM
		table_1, table_2
	WHERE
		table_1.key = table_2.key;

		SELECT * FROM relation_accesses WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;

-- a simple real-time join touches all shard per table
-- in sequential mode
BEGIN;

	SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
	SELECT
		count(*)
	FROM
		table_1, table_2
	WHERE
		table_1.key = table_2.key;

		SELECT * FROM relation_accesses WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;

-- a simple subquery pushdown that touches all shards
BEGIN;
	SELECT
		count(*)
	FROM
	(

		SELECT
			random()
		FROM
			table_1, table_2, table_3, table_4, table_5
		WHERE
			table_1.key = table_2.key AND table_2.key = table_3.key AND
			table_3.key = table_4.key AND table_4.key = table_5.key
	) as foo;

	SELECT * FROM relation_accesses WHERE table_name LIKE 'table_%' ORDER BY 1;
ROLLBACK;

-- simple multi shard update both sequential and parallel modes
-- note that in multi shard modify mode we always add select
-- access for all the shards accessed. But, sequential mode is OK
BEGIN;
	UPDATE table_1 SET value = 15;
	SELECT * FROM relation_accesses WHERE table_name = 'table_1';
	SET LOCAL citus.multi_shard_modify_mode = 'sequential';
	UPDATE table_2 SET value = 15;
	SELECT * FROM relation_accesses WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;

-- now UPDATE/DELETE with subselect pushdown
BEGIN;
	UPDATE
		table_1 SET value = 15
	WHERE key IN (SELECT key FROM table_2 JOIN table_3 USING (key) WHERE table_2.value = 15);
	SELECT * FROM relation_accesses WHERE table_name IN ('table_1', 'table_2', 'table_3')  ORDER BY 1;
ROLLBACK;

-- INSERT .. SELECT pushdown
BEGIN;
	INSERT INTO table_2 SELECT * FROM table_1;
	SELECT * FROM relation_accesses WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;

-- INSERT .. SELECT pushdown in sequential mode should be OK
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode = 'sequential';

	INSERT INTO table_2 SELECT * FROM table_1;
	SELECT * FROM relation_accesses WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;

-- coordinator INSERT .. SELECT
BEGIN;
    -- We use offset 1 to make sure the result needs to be pulled to the coordinator, offset 0 would be optimized away
	INSERT INTO table_2 SELECT * FROM table_1 OFFSET 1;
	SELECT * FROM relation_accesses WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;



-- recursively planned SELECT
BEGIN;
	SELECT
		count(*)
	FROM
	(

		SELECT
			random()
		FROM
			table_1, table_2
		WHERE
			table_1.key = table_2.key
			OFFSET 0
	) as foo;

	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;

-- recursively planned SELECT and coordinator INSERT .. SELECT
BEGIN;
	INSERT INTO table_3 (key)
	SELECT
		*
	FROM
	(

		SELECT
			random() * 1000
		FROM
			table_1, table_2
		WHERE
			table_1.key = table_2.key
			OFFSET 0
	) as foo;

	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1', 'table_2', 'table_3')  ORDER BY 1;
ROLLBACK;

-- recursively planned SELECT and coordinator INSERT .. SELECT
-- but modifies single shard, marked as sequential operation
BEGIN;
	INSERT INTO table_3 (key)
	SELECT
		*
	FROM
	(

		SELECT
			random() * 1000
		FROM
			table_1, table_2
		WHERE
			table_1.key = table_2.key
		AND table_1.key = 1
			OFFSET 0
	) as foo;

	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1', 'table_2', 'table_3')  ORDER BY 1;
ROLLBACK;

-- recursively planned SELECT and recursively planned multi-shard DELETE
BEGIN;
	DELETE FROM table_3 where key IN
	(
		SELECT
			*
		FROM
		(
			SELECT
				table_1.key
			FROM
				table_1, table_2
			WHERE
				table_1.key = table_2.key
				OFFSET 0
		) as foo
	) AND value IN (SELECT key FROM table_4);

	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1', 'table_2', 'table_3', 'table_4')  ORDER BY 1;
ROLLBACK;

-- copy out
BEGIN;
	COPY (SELECT * FROM table_1 WHERE key IN (1,2,3) ORDER BY 1) TO stdout;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- copy in
BEGIN;
	COPY table_1 FROM STDIN WITH CSV;
1,1
2,2
3,3
\.
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- copy in single shard
BEGIN;
	COPY table_1 FROM STDIN WITH CSV;
1,1
1,2
1,3
\.
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- reference table accesses should always be a sequential
BEGIN;
	SELECT count(*) FROM table_6;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_6');

	UPDATE table_6 SET value = 15;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_6');

	ALTER TABLE table_6 ADD COLUMN x INT;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_6');
ROLLBACK;

-- reference table join with a distributed table
BEGIN;
	SELECT count(*) FROM table_1 JOIN table_6 USING(key);
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_6', 'table_1') ORDER BY 1,2;
ROLLBACK;

-- TRUNCATE should be DDL
BEGIN;
	TRUNCATE table_1;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- TRUNCATE can be a sequential DDL
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode = 'sequential';
	TRUNCATE table_1;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- TRUNCATE on a reference table should be sequential
BEGIN;
	TRUNCATE table_6;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_6')  ORDER BY 1;
ROLLBACK;

-- creating foreign keys should consider adding the placement accesses for the referenced table
ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
BEGIN;
	ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;

-- creating foreign keys should consider adding the placement accesses for the referenced table
-- in sequential mode as well
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode = 'sequential';

	ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;

CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
SELECT create_distributed_table('partitioning_test', 'id');

-- Adding partition tables via CREATE TABLE should have DDL access the partitioned table as well
BEGIN;
	CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009')  ORDER BY 1;
ROLLBACK;

-- Adding partition tables via ATTACH PARTITION on local tables should have DDL access the partitioned table as well
CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test;
BEGIN;
	ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009')  ORDER BY 1;
COMMIT;

-- Adding partition tables via ATTACH PARTITION on distributed tables should have DDL access the partitioned table as well
CREATE TABLE partitioning_test_2010 AS SELECT * FROM partitioning_test;
SELECT create_distributed_table('partitioning_test_2010', 'id');
BEGIN;
	ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2010 FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2010')  ORDER BY 1;
COMMIT;

-- reading from partitioned table marks all of its partitions
BEGIN;
	SELECT count(*) FROM partitioning_test;
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
COMMIT;

-- reading from partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
	SELECT count(*) FROM partitioning_test;
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
COMMIT;

-- updating partitioned table marks all of its partitions
BEGIN;
	UPDATE partitioning_test SET time = now();
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
COMMIT;

-- updating partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
	UPDATE partitioning_test SET time = now();
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
COMMIT;


-- DDLs on partitioned table marks all of its partitions
BEGIN;
	ALTER TABLE partitioning_test ADD COLUMN X INT;
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
ROLLBACK;

-- DDLs on partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
	ALTER TABLE partitioning_test ADD COLUMN X INT;
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
ROLLBACK;


-- reading from partition table marks its parent
BEGIN;
	SELECT count(*) FROM partitioning_test_2009;
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
COMMIT;

-- rreading from partition table marks its parent with sequential accesses
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
	SELECT count(*) FROM partitioning_test_2009;
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
COMMIT;

-- updating from partition table marks its parent
BEGIN;
	UPDATE partitioning_test_2009 SET time = now();
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
COMMIT;

-- updating from partition table marks its parent sequential accesses
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
	UPDATE partitioning_test_2009 SET time = now();
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
COMMIT;


-- DDLs on partition table marks its parent
BEGIN;
	CREATE INDEX i1000000 ON partitioning_test_2009 (id);
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
ROLLBACK;

-- DDLs on partition table marks its parent in sequential mode
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
	CREATE INDEX i1000000 ON partitioning_test_2009 (id);
	SELECT * FROM relation_accesses  WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')  ORDER BY 1;
ROLLBACK;

-- TRUNCATE CASCADE works fine
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
BEGIN;
	TRUNCATE table_1 CASCADE;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1', 'table_2')  ORDER BY 1;
ROLLBACK;

-- CTEs with SELECT only should work fine
BEGIN;

	WITH cte AS (SELECT count(*) FROM table_1)
	SELECT * FROM cte;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
COMMIT;

-- CTEs with SELECT only in sequential mode should work fine
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode = 'sequential';

	WITH cte AS (SELECT count(*) FROM table_1)
	SELECT * FROM cte;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
COMMIT;

-- modifying CTEs should work fine with multi-row inserts, which are by default in sequential
BEGIN;

	WITH cte_1 AS (INSERT INTO table_1 VALUES (1000,1000), (1001, 1001), (1002, 1002) RETURNING *)
	SELECT * FROM cte_1 ORDER BY 1;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- modifying CTEs should work fine with parallel mode
BEGIN;

	WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *)
	SELECT count(*) FROM cte_1 ORDER BY 1;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- modifying CTEs should work fine with sequential mode
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode = 'sequential';

	WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *)
	SELECT count(*) FROM cte_1 ORDER BY 1;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- router planned modifying CTEs should work fine with parallel mode
BEGIN;

	WITH cte_1 AS (UPDATE table_1 SET value = 15 WHERE key = 6 RETURNING *)
	SELECT count(*) FROM cte_1 ORDER BY 1;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- router planned modifying CTEs should work fine with sequential mode
BEGIN;
	SET LOCAL citus.multi_shard_modify_mode = 'sequential';

	WITH cte_1 AS (UPDATE table_1 SET value = 15 WHERE key = 6 RETURNING *)
	SELECT count(*) FROM cte_1 ORDER BY 1;
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_1')  ORDER BY 1;
ROLLBACK;

-- create distributed table with data loading
-- should mark both parallel dml and parallel ddl
DROP TABLE table_3;
CREATE TABLE table_3 (key int, value int);
INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i;
BEGIN;
	SELECT create_distributed_table('table_3', 'key');
	SELECT * FROM relation_accesses  WHERE table_name IN ('table_3')  ORDER BY 1;
COMMIT;

SET search_path TO 'public';
DROP SCHEMA access_tracking CASCADE;
